feat: non-AQE DPP for native Parquet scans, broadcast exchange reuse for DPP subqueries#4011
Merged
mbutrovich merged 23 commits intoapache:mainfrom Apr 21, 2026
Merged
feat: non-AQE DPP for native Parquet scans, broadcast exchange reuse for DPP subqueries#4011mbutrovich merged 23 commits intoapache:mainfrom
mbutrovich merged 23 commits intoapache:mainfrom
Conversation
Open
5 tasks
andygrove
approved these changes
Apr 21, 2026
Member
andygrove
left a comment
There was a problem hiding this comment.
This is amazing! Thank you @mbutrovich
🔥
andygrove
added a commit
to andygrove/datafusion-comet
that referenced
this pull request
Apr 22, 2026
DPP support for native_datafusion scan was added in apache#4011. Update the Spark SQL test diffs so the test runs under native_datafusion by removing the IgnoreCometNativeDataFusion tag and adding a CometNativeScanExec case to getFactScan.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Partially addresses #3510. Closes #4014. Related to #121. Closes #242.
Rationale for this change
CometNativeScanExec(the native DataFusion-based Parquet V1 scan) falls back to Spark for queries that use Dynamic Partition Pruning (DPP), even though the non-AQE DPP path works through the existing lazy partition serialization infrastructure from #3511 and #3349.Spark has two DPP paths:
PlanDynamicPruningFiltersSubqueryBroadcastExec/SubqueryExecPlanAdaptiveDynamicPruningFiltersSubqueryAdaptiveBroadcastExecThe prior gate rejected both. This PR narrows it to only reject AQE DPP.
What changes are included in this PR?
DPP gate changes
isAqeDynamicPruningFilterto distinguish AQE from non-AQE DPP by checking forSubqueryAdaptiveBroadcastExecinsideInSubqueryExec.CometScanRule.transformV1Scan()andCometNativeScan.isSupported()now reject only AQE DPP.spark.comet.dppFallback.enabledconfig — redundant sinceisSupported()unconditionally rejects AQE DPP.DPP subquery resolution
CometNativeScanExec.serializedPartitionDataexplicitly resolves DPP subqueries viaupdateResult()before accessing file partitions. This is needed becauseserializedPartitionDatacan be triggered fromfindAllPlanDataon aBroadcastExchangeExecthread, outside the normalprepare()->executeSubqueries()flow.CometNativeScanExec.outputPartitioningusesperPartitionData.lengthinstead oforiginalPlan.inputRDD.getNumPartitionsto avoid triggeringFileSourceScanExeccodegen on unresolved DPP expressions.CometNativeScan.convert()usesscan.selectedPartitions(static) instead ofscan.getFilePartitions()for object store option extraction, since DPP subqueries aren't resolved at planning time.Broadcast exchange reuse (
CometSubqueryBroadcastExec)ReuseExchangeAndSubqueryruns after Comet rules. Without intervention, the join side hasCometBroadcastExchangeExecand the DPP subquery hasBroadcastExchangeExec— different types, no reuse, double broadcast.CometSubqueryBroadcastExecreplacesSubqueryBroadcastExecin DPP expressions. It wraps aCometBroadcastExchangeExecand decodes Arrow broadcast data to extract DPP key values.CometExecRule.convertSubqueryBroadcasts()handles the conversion duringtransformUp, stripping theCometNativeColumnarToRowExectransition thatApplyColumnarRulesAndInsertTransitionsinserted for the originalBroadcastExchangeExec. Both sides now haveCometBroadcastExchangeExec, soReuseExchangeAndSubquerymatches them.Shuffle fallback
stageContainsDPPScaninCometShuffleExchangeExecchecks forFileSourceScanExecwith DPP. Scans converted toCometNativeScanExecdon't match, so the shuffle proceeds natively. TheCOMET_DPP_FALLBACK_ENABLEDconfig gate was removed; the check now always runs.How are these changes tested?
"non-AQE DPP with BHJ works with CometNativeScanExec"— verifies correct results,CometNativeScanExecin plan,DynamicPruningExpressionin partition filters"non-AQE DPP with SMJ works with CometNativeScanExec"— same for sort-merge join path"non-AQE DPP with BHJ reuses broadcast exchange"— verifiesCometSubqueryBroadcastExec,ReusedExchangeExec, and exactly 1CometBroadcastExchangeExec"DPP fallback"and"DPP fallback avoids inefficient Comet shuffle"tests updated for new message text and config removalCometDppFallbackRepro3949SuiteandCometShuffleFallbackStickinessSuitepass with config references removedCometNativeScanExecadded tocollectDynamicPruningExpressionsinDynamicPartitionPruningSuiteandDynamicPartitionPruningHiveScanSuiteCometSubqueryBroadcastExecadded tocheckPartitionPruningPredicate,checkDistinctSubqueries,countSubqueryBroadcasts,countReusedSubqueryBroadcasts, and SPARK-38674 testCometNativeScanExec/CometNativeColumnarToRowExecadded toSubquerySuiteSPARK-26893 testRemoveRedundantProjectsSuite"join with ordering requirement" unignored (exchange reuse now works)TPC-DS native acceleration improvement (Spark 3.5, v1.4)
78 queries improved. Previously DPP queries fell back entirely; now they run natively.